### New setup:
library(zoo)
library(gdata)
library(tidyverse)
library(readxl)
library(lubridate)
library(data.table)
library(ggplot2)
library(ggthemes)
library(sf)
library(sp)
library(rgeos)
require(foreach)
require(doSNOW)
require(iterators)

NumCores = 50
cl <- snow::makeCluster(rep('localhost',NumCores), type='SOCK')
registerDoSNOW(cl)

WORK = file.path('Z:/user/ajk41/Solar siting/Scripts/JAERE Code Repository/Data JAERE')
WORK.OUT = file.path(WORK, 'intermediate_output')
dir.create(WORK.OUT)

minyear = 2007
maxyear = 2016




####
or = read.csv(file=file.path(WORK, "ORISPL_to_NERC_v1.csv"), stringsAsFactors=F)
or = or[,c('ORISPL','NERC','tz')]

CEMf = file.path(WORK,'CEM') #--> Folder of raw CEM files
CEMfiles = list.files(path= CEMf, pattern = '.zip', recursive=T, full.names=T)
CEMfiles = grep('HLD', CEMfiles,  invert=T, value=T)
CEMfiles = grep('pr', CEMfiles, invert=T, value=T)
CEMfiles = grep('Q',CEMfiles, invert=T, value=T)  #--> some 2007 quarterly data snuck in.
##--> A function that takes the filepath, loads it in, fixes timezones
##    and aggregates at the NERC region level.
proc.CEM = function(xx, data.path = CEMf) {
  
  yy = read.csv(unzip(zipfile=file.path(xx)), stringsAsFactors = F)
  mass_names = grep('_MASS', names(yy), value=T)
  mass_names = grep('FLG',mass_names, invert=T, value=T)
  mass_names = c(grep('SO2',mass_names, value=T), grep('NOX',mass_names, value=T), grep('CO2',mass_names, value=T))
  
  
  yy = yy[,c('ORISPL_CODE','UNITID','OP_DATE','OP_HOUR',mass_names, grep('GLOAD', names(yy), value=T))]
  names(yy) = c('ORISPL_CODE','UNITID','OP_DATE','OP_HOUR','SO2_MASS','NOX_MASS','CO2_MASS','GLOAD_MW')
  
  yy[,5:8][is.na(yy[,5:8])] = 0    #-->UNITID shifts this one up
  
  yy = aggregate(cbind(SO2_MASS,NOX_MASS,CO2_MASS,GLOAD_MW) ~ ORISPL_CODE + OP_DATE + OP_HOUR, data=yy, FUN=sum)
  
  yy = merge(x=yy, y=or, by.x='ORISPL_CODE', by.y='ORISPL', all.x=T, all.y=F)
  
  yy = lapply(split(yy, f=list(yy$NERC, yy$tz), drop=T), function(zz) {
    zz$dh_e = with_tz(mdy_h(paste0(zz$OP_DATE, " ", zz$OP_HOUR), tz=zz$tz[1]), tzone='America/New_York')
    return(zz)}) #--> end lapply
  yy = do.call(rbind, yy)
  
  yy[,4:7][is.na(yy[,4:7])] = 0
  
  # if(sum(is.na(yy$SO2_MASS))==NROW(yy) & sum(is.na(yy$NOX_MASS))==NROW(yy)) yy[is.na(yy)] = 0
  
  #  yy = aggregate(cbind(SO2_MASS, NOX_MASS, CO2_MASS, GLOAD_MW) ~ dh_e + NERC, data=yy, FUN='sum', na.rm=T)
  
  dst.bad.dates = mdy('03-14-2010','11-07-2010',  #--> dropping all days with DST, since we estimate for weekday/weekend and month
                      '03-13-2011','11-06-2011',
                      '03-11-2012','11-04-2012',
                      '03-10-2013','11-03-2013',
                      '03-09-2014','11-02-2014',
                      '03-18-2015','11-01-2015',
                      '03-13-2016','11-06-2016',
                      '03-11-2007','11-04-2007',
                      '03-08-2008','11-02-2008',
                      '03-08-2009','11-01-2009')
  dst.bad.dates = c(dst.bad.dates, dst.bad.dates-1)
  
  yy = yy[which(!yy$dh_e%in%dst.bad.dates),]
  
  return(yy) } 





ptm = Sys.time()
CEMfiles2 <- foreach(xx=iter(CEMfiles, by='cell'), .inorder=F, .errorhandling='pass',
                     .packages=c('data.table','lubridate'),
                     .verbose=T) %dopar%{
  if(!'character'%in%class(xx)) return(xx)
  res=try(proc.CEM(xx))
  if(class(res)=='try-error')  return(xx)
  return(res)}

Sys.time()-ptm


sum(lapply(CEMfiles2, class)=='character') # should be zero if no read errors

dst.bad.dates = mdy('03/11/2007','11/04/2007',
                    '03/09/2008','11/02/2008',
                    '03/08/2009','11/01/2009',
                    '03/14/2010','11/07/2010',  #--> dropping all days with DST
                    '03/13/2011','11/06/2011',
                    '03/11/2012','11/04/2012',
                    '03/10/2013','11/03/2013',
                    '03/09/2014','11/02/2014',
                    '03/08/2015','11/01/2015', 
                    '03/13/2016','11/06/2016')


CEM = rbindlist(lapply(CEMfiles2, function(x) {
  x = as.data.table(x)[!date(dh_e)%in%dst.bad.dates,]
  x[,c('weekday','mo','hour','year'):=list((wday(dh_e)!=1&wday(dh_e)!=7),
                                          factor(month(dh_e)),
                                          factor(hour(dh_e)),
                                          factor(year(dh_e)))]
  x[!is.na(dh_e),]}))



CEM[,GLOAD_MASS:=GLOAD_MW]  #-->  in case any future greps use "MASS" to pull pollutants. Estimating GLOAD as it it were a pollutant; rescale to PM2.5
CEM[,c('GLOAD_MW', 'OP_DATE', 'OP_HOUR', 'tz'):=NULL]
setkey(CEM, ORISPL_CODE, dh_e)


df.split = split(subset(CEM, select=c('dh_e',grep(pattern = "_MASS|_CODE", names(CEM), value=T, invert=F), 'NERC','weekday')), by='ORISPL_CODE')
names(df.split) = paste0('ORISPL_',names(df.split))


saveRDS(df.split, file.path(WORK.OUT, 'using_dfsplit.rds'))

